[備忘録] Kinesis FirehoseのLambdaによるデータ変換について

[備忘録] Kinesis FirehoseのLambdaによるデータ変換について

Clock Icon2020.05.25

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、CX事業本部の夏目です。

案件でFirehoseのLambdaによるデータ変換を使ったのですが、FirehoseのドキュメントがLambdaのBluePrint(設計図)を使ってねぐらいの書き方で、どんなeventが来てどんなresponseを返さないといけないのか詳しく書いてなかったので、備忘録として残しておきます。

Lambdaの設定について

  • LambdaにつけるIAM Roleでは特に必要と言えるものはない
  • タイムアウトは5分までしか対応していない (これはドキュメントに記載されている)

Lambdaに渡されるEventについて

{
  "invocationId": "b384e3cf-8ae0-47ac-8e2a-8e4a9e59941e",
  "sourceKinesisStreamArn": "arn:aws:kinesis:ap-northeast-1:000000000000:stream/firehose_ln",
  "deliveryStreamArn": "arn:aws:firehose:ap-northeast-1:000000000000:deliverystream/firehose_ln",
  "region": "ap-northeast-1",
  "records": [
    {
      "recordId": "49607016322531838770965472870713485392692964656771235842000000",
      "approximateArrivalTimestamp": 1589518484529,
      "data": "eyJhIjogImM3NjNlM2I5LWI1NDQtNDg4NS04YWM1LWQ3ZDg4MjJjNGNmYyIsICJiIjogImQ1NjNiYWU0LTYwZTItNDhiOC1hZTQ0LTdkYjU2ZTZlNTZkNSIsICJjIjogIjljZjNkYmE3LTY2NTEtNGE2MC1iOGJhLThkNmFhMGYwMjNkMiJ9",
      "kinesisRecordMetadata": {
        "sequenceNumber": "49607016322531838770965472870713485392692964656771235842",
        "subsequenceNumber": 0,
        "partitionKey": "d39ae9bc-27ba-4208-92c8-98c607b84c46",
        "shardId": "shardId-000000000000",
        "approximateArrivalTimestamp": 1589518484529
      }
    },
    {
      "recordId": "49607016322531838770965472870714694318512579354665418754000000",
      "approximateArrivalTimestamp": 1589518485244,
      "data": "eyJhIjogIjUxNDJhMGUwLTdiMWYtNGMxNS1iZjBiLThhMjU2ZmQzZjkyYyIsICJiIjogImNkZTVmNWRmLWUzMGMtNGE2Yi1iNjJjLWZiNmY3Zjk1NTYzNSIsICJjIjogIjljN2YyMWRlLWU2MWItNGJhZi05YjQ0LTg2MmE0YmVhZTVlMiJ9",
      "kinesisRecordMetadata": {
        "sequenceNumber": "49607016322531838770965472870714694318512579354665418754",
        "subsequenceNumber": 0,
        "partitionKey": "75c67f74-41be-4be7-8d64-868712be1773",
        "shardId": "shardId-000000000000",
        "approximateArrivalTimestamp": 1589518485244
      }
    }
  ]
}
  • 基本的に使用するのは、recordsの中身
    • 特にrecordIdはresponseで使用する
  • recordsの各要素のdataは各データをBase64エンコードしたもの

Lambdaが返す値について

{
  "records": [
    {
      "recordId": "49607016322531838770965472870713485392692964656771235842000000",
      "result": "Ok",
      "data": "eyJhIjogImM3NjNlM2I5LWI1NDQtNDg4NS04YWM1LWQ3ZDg4MjJjNGNmYyIsICJiIjogImQ1NjNiYWU0LTYwZTItNDhiOC1hZTQ0LTdkYjU2ZTZlNTZkNSIsICJjIjogIjljZjNkYmE3LTY2NTEtNGE2MC1iOGJhLThkNmFhMGYwMjNkMiJ9"
    },
    {
      "recordId": "49607016322531838770965472870714694318512579354665418754000000",
      "result": "Ok",
      "data": "eyJhIjogIjUxNDJhMGUwLTdiMWYtNGMxNS1iZjBiLThhMjU2ZmQzZjkyYyIsICJiIjogImNkZTVmNWRmLWUzMGMtNGE2Yi1iNjJjLWZiNmY3Zjk1NTYzNSIsICJjIjogIjljN2YyMWRlLWU2MWItNGJhZi05YjQ0LTg2MmE0YmVhZTVlMiJ9"
    }
  ]
}
  • Lambdaのresponseは上記形式になっている必要がある
    • 自分はrecordsの配列だけを返していて、形式が異なると怒られた
  • recordsの各要素については、ドキュメントに詳しく記載されている
  • dataはBase64でエンコードしたもの

Firehoseの設定について (CloudFormation)

  # Lambdaによるデータ変換において必要そうな要素のみ抜粋
  DeliveryStream:
    Type: AWS::KinesisFirehose::DeliveryStream
    Properties:
      ExtendedS3DestinationConfiguration:
        ProcessingConfiguration:
          Enabled: true
          Processors:
            - Type: Lambda
              Parameters:
                - ParameterName: LambdaArn
                  ParameterValue: !Ref ConvertLambda.Alias
                - ParameterName: NumberOfRetries
                  ParameterValue: "3"
                - ParameterName: RoleArn
                  ParameterValue: !GetAtt InvokeLambdaRole.Arn
                - ParameterName: BufferSizeInMBs
                  ParameterValue: "1"
                - ParameterName: BufferIntervalInSeconds
                  ParameterValue: "60"
        S3BackupMode: Enabled
        S3BackupConfiguration:
          RoleARN: !GetAtt DeliveryRole.Arn
          BucketARN: arn:aws:s3:::delivery-bucket
          Prefix: Backup/
          ErrorOutputPrefix: "!{firehose:error-output-type}/"
          BufferingHints:
            SizeInMBs: 5
            IntervalInSeconds: 300
          CompressionFormat: UNCOMPRESSED
  • Lambdaでデータ変換を行う場合、各DestinationConfigurationの中のProcessingConfigurationで設定を記述する
    • 上記例ではExtendedS3DestinationConfigurationを使用している
  • 変換で呼び出すLambdaなどは、Processors[].Parametersのパラメータとして指定する
  • RoleArnでは変換に使用するLambdaのInvoke権限が必要
  • 変更に失敗したときのために、S3Backupの設定をいれておくと良さそう

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.